/*
    Copyright 2006-2012 Patrik Jonsson, sunrise@familjenjonsson.org

    This file is part of Sunrise.

    Sunrise is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    Sunrise is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with Sunrise.  If not, see <http://www.gnu.org/licenses/>.

*/

/** \file

    Declaration of the mpi_master class.
*/


#ifndef __mpi_master__
#define __mpi_master__

#include "config.h"
#include <tbb/atomic.h>

namespace mcrx {
  // forward decl of xfer class
  template<typename, typename> class xfer;

  template<typename> class mpi_master;
};

#ifdef HAVE_MPI

#include <boost/thread.hpp>
#include <tbb/tick_count.h>
#include <vector>
#include <boost/mpi/communicator.hpp>

template<typename xfer_type>
class mcrx::mpi_master {
public:
  typedef xfer_type T_xfer;
  typedef typename T_xfer::T_grid T_grid;
  typedef typename T_xfer::T_queue_item T_queue_item;

private:

  /** Comm tags. The location tags are handled by the
      request_thread2tag and response_thread2tag functions. */
  enum {ray_tag=11, finish_tag=12, handshake_tag=13, skeleton_tag=14, 
	master_finish_tag=15 };

  /// The max number of items on the queue
  static const int max_queue_depth_ = 10000;

  bool send_buffer_full_printed_;
  bool we_finished_, sent_finish_, all_finished_;
  boost::optional<tbb::tick_count> finish_time_;
  static const double finish_buffer_time_ = 1.0;

  T_xfer& x;
  int n_threads_;
  tbb::atomic<long>& n_rays_;
  long n_rays_desired_;
  
  double min_idle_time_;
  tbb::tick_count last_action_time_;
  long n_received_;
  tbb::atomic<long> n_sent_;
  int i_hpmdump_;
  enum stages { Misc, Handshake, Receive, Receive_test, Location, Location_test };

  boost::mpi::communicator world;

  /// Outstanding MPI requests for incoming location requests. 
  std::vector<boost::mpi::request> location_reqs_;
  std::vector<boost::optional<boost::mpi::request> > location_send_reqs_;
  /// The data for incoming location requests. This is ordered such
  /// that a request from task i, thread j is in i*n_threads+j.
  std::vector<typename T_grid::T_location_request> location_data_;
  std::vector<typename T_grid::T_location_response> location_send_data_;
  std::vector<boost::mpi::status> location_stata_;
  long n_resp_;

  /// Outstanding MPI requests for incoming ray receives.
  std::vector<boost::mpi::request> ray_reqs_;
  /// Buffer for the incoming ray receives
  std::vector<T_queue_item> ray_recv_data_;

  /// Struct containing the send data used by the threads. This is
  /// stored in a thread_specific_pointer.
  class thread_data {
  public:
    /// Outstanding MPI requests for outgoing ray sends
    boost::mpi::request send_req;
    /// Buffer for the outgoing send items
    T_queue_item send_data;
    
    /// Destructor cancels the MPI request.
    ~thread_data() {send_req.cancel(); send_req.wait(); };
  };
  boost::thread_specific_ptr<thread_data> tdata_;

  /** Outstanding MPI requests for ray push handshakes. Each task
      listens to all other tasks for messages that enable/disable
      sending of rays to that task. */
  std::vector<boost::mpi::request> handshake_reqs_;
  std::vector<boost::optional<boost::mpi::request> > handshake_send_reqs_;
  /** Can't make a vector of volatile ints, so we make a POD array. */
  std::vector<int> handshake_data_;
  std::vector<int> handshake_send_data_;
  bool recv_enabled_;
  /// False if we have gotten a stop message from *any* task.
  volatile bool send_enabled_;

  /** Vector of request stata, used when calling test_some on request
      vectors. */
  std::vector<boost::mpi::status> req_stata_;
  std::vector<boost::mpi::status> hskreq_stata_;

  /// MPI request for the send to the master that worker is finished
  boost::optional<boost::mpi::request> finish_send_req_;
  /// MPI request for the message from the master that all are finished.
  boost::mpi::request finish_req_;
  /** Request objects for the message indicating that the tasks are
      finished. This is only allocated on task 0. */
  std::vector<boost::mpi::request> master_finish_reqs_;

public:
  mpi_master (T_xfer&, int, tbb::atomic<long>&, long);
  ~mpi_master();

  void init_ray_data();
  void setup_requests();
  void process_handshakes();
  void send_handshake(bool enable);
  bool process_incoming_rays();
  //bool process_outgoing_rays();
  bool process_locations();
  void terminate_workers();
  bool test_finish();
  void run();

  void thread_send_ray(const T_queue_item& item, 
		       int thread, int dest);

  T_xfer& xfer() const { return x; };
  int task() const { return world.rank(); };
  int n_tasks() const { return world.size(); };
  int n_threads() const { return n_threads_; };
  const volatile bool& send_enabled() const { return send_enabled_; };
};

#else // with_mpi

/** Stub version of the mpi_master class when not using MPI. */
template<typename xfer_type>
class mcrx::mpi_master {
public:
  typedef xfer_type T_xfer;
  typedef typename T_xfer::T_grid T_grid;
  typedef typename T_xfer::T_queue_item T_queue_item;

private:
  T_xfer& x;
  int n_threads_;
  tbb::atomic<long>& n_rays_;
  long n_rays_desired_;

public:
  mpi_master (T_xfer&, int, tbb::atomic<long>&, long);

  void thread_send_ray(const T_queue_item& item, 
		       int thread, int dest) {assert(0); };
  void run();

  T_xfer& xfer() const { return x; };
  int task() const { return mpi_rank(); };
  int n_tasks() const { return mpi_size(); };
  int n_threads() const { return n_threads_; };
  bool send_enabled() const { return true; };
  void terminate_workers();
};

#endif // with_mpi

#endif
